local server = require "resty.websocket.server"
local redis = require "resty.redis"
-- cjson 模块
local cjson = require "cjson";

local function exit(is_ws)
       if is_ws == nil then ngx.eof() end 
       ngx.flush(true);
       ngx.exit(ngx.HTTP_OK);
       return nil;
end

local ok, err = ngx.on_abort(exit); --注册一个函数  当客户端断开连接时执行
if err then
    return exit();
end
 


--获取聊天室id 
local channel_id = 800; 
local channel_name = "chat_" .. tostring(channel_id)

--create connection
local wb, err = server:new{
  timeout = 5000,
  max_payload_len = 65535
};
 
if not wb then
  ngx.log(ngx.ERR, "failed to new websocket: ", err);
  return exit();
end
 

--[[
- @desc   用于调试  lua数据输出
- @param  string   字符串 
- return  string
--]]
function dump(v)
    if not __dump then
		function __dump(v, t, p)    
			local k = p or "";

			if type(v) ~= "table" then
				table.insert(t, k .. " : " .. tostring(v));
			else
				for key, value in pairs(v) do
					__dump(value, t, k .. "[" .. key .. "]");
				end
			end
		end
	end 
	local t = {'======== Lib:Dump Content ========'};
	__dump(v, t);
	print(table.concat(t, "\n"));
end

 
----创建redis实例
local getRedis = function (key)
        if not key then 
          return nil;
        end
         
	    if ngx.ctx[key] then
	      return ngx.ctx[key];
	    end
	    
	    dump('-------------创建redis实例---------------------');
		local red = redis:new();
		red:set_timeout(1000) -- 1 sec 设置连接超时1秒
		local  ok, err = red:connect("127.0.0.1", 6379); 
		 if not ok then 
		    ngx.log(ngx.ERR, "failed to connect redis: ", err);
		end 
		ngx.ctx[key] = red;
		return red; 
end


local push = function()
   local red = getRedis('redisfn1'); 
    --redis 订阅 
    local res, err = red:subscribe(channel_name);
    if not res then
        ngx.log(ngx.ERR, "failed to sub redis: ", err);
        wb:send_close();
        return  exit(); 
    end

    -- 不断读取数据如果有值立刻发送给客户端
    while true do
        local res, err = red:read_reply(); 
        if res then 
            local bytes, err = wb:send_text(cjson.encode(res));
            if not bytes then
                wb:send_close();
                -- 更好的错误处理
                ngx.log(ngx.ERR, "failed to send text: ", err)
                return exit();
            end
        end
        ngx.sleep(0.5);
    end
end
 
 
local co = ngx.thread.spawn(push);
 
 
 
 
--main loop 
while true do
    -- 获取数据
    local data, typ, err = wb:recv_frame();

    -- 如果连接损坏 退出
    if wb.fatal then
        ngx.log(ngx.ERR, "failed to receive frame: ", err);
        return exit();
    end

    if not data then 
    
        local bytes, err = wb:send_ping();
        if not bytes then
          ngx.log(ngx.ERR, "failed to send ping: ", err);
          return exit();
        end 
        
    elseif typ == "close" then
    
        break;
        
    elseif typ == "ping" then
    
        local bytes, err = wb:send_pong();
        if not bytes then
            ngx.log(ngx.ERR, "failed to send pong: ", err);
            return exit();
        end
        
    elseif typ == "pong" then
    
        --ngx.log(ngx.ERR, "client ponged")
        
    elseif typ == "text" then 
        --接收消息写入redis
        
		local red = getRedis('redisfn2'); 
        local res, err = red:publish(channel_name, data)
        if not res then
            ngx.log(ngx.ERR, " 接收消息写入redis错误 failed to publish redis: ", err)
        end 
    end
     ngx.sleep(0.5);
end

wb:send_close();
ngx.thread.wait(co);
